3f2d5bea20ce647d6fff64a9b2d7a488b005f0c9,ambari-server/src/test/java/org/apache/ambari/server/actionmanager/TestActionScheduler.java,TestActionScheduler,testExclusiveRequests,#,2277

Before Change


    Stage stageInProgress1 = spy(getStageWithSingleTask(
      hostname1, "cluster1", Role.NAMENODE, RoleCommand.START,
      Service.Type.HDFS, namenodeCmdTaskId, 1, (int) requestId1));
    Stage stageInProgress2 = spy(getStageWithSingleTask(
      hostname1, "cluster1", Role.DATANODE, RoleCommand.START,
      Service.Type.HDFS, 2, 2, (int) requestId1));
    Stage stageInProgress3 = spy(getStageWithSingleTask(
      hostname2, "cluster1", Role.DATANODE, RoleCommand.STOP, //Exclusive
      Service.Type.HDFS, 3, 3, (int) requestId2));
    Stage stageInProgress4 = spy(getStageWithSingleTask(
      hostname3, "cluster1", Role.DATANODE, RoleCommand.START,
      Service.Type.HDFS, 4, 4, (int) requestId3));
    stagesInProgress.add(stageInProgress1);
    stagesInProgress.add(stageInProgress2);
    stagesInProgress.add(stageInProgress3);
    stagesInProgress.add(stageInProgress4);


    Host host1 = mock(Host.class);
    when(fsm.getHost(anyString())).thenReturn(host1);
    when(host1.getState()).thenReturn(HostState.HEALTHY);
    when(host1.getHostName()).thenReturn(hostname);
    when(host1.getLastRegistrationTime()).thenReturn(HOST_LAST_REGISTRATION_TIME);

    Host host2 = mock(Host.class);
    when(fsm.getHost(anyString())).thenReturn(host2);
    when(host2.getState()).thenReturn(HostState.HEALTHY);
    when(host2.getHostName()).thenReturn(hostname);
    when(host2.getLastRegistrationTime()).thenReturn(HOST_LAST_REGISTRATION_TIME);

    Host host3 = mock(Host.class);
    when(fsm.getHost(anyString())).thenReturn(host3);
    when(host3.getState()).thenReturn(HostState.HEALTHY);
    when(host3.getHostName()).thenReturn(hostname);
    when(host3.getLastRegistrationTime()).thenReturn(HOST_LAST_REGISTRATION_TIME);

    ActionDBAccessor db = mock(ActionDBAccessor.class);
    when(db.getCommandsInProgressCount()).thenReturn(stagesInProgress.size());
    when(db.getStagesInProgress()).thenReturn(stagesInProgress);

    List<HostRoleCommand> requestTasks = new ArrayList<HostRoleCommand>();
    for (Stage stage : stagesInProgress) {
      requestTasks.addAll(stage.getOrderedHostRoleCommands());
    }
    when(db.getRequestTasks(anyLong())).thenReturn(requestTasks);
    when(db.getAllStages(anyLong())).thenReturn(stagesInProgress);
    doAnswer(new Answer<Void>() {
      @Override
      public Void answer(InvocationOnMock invocation) throws Throwable {
        List<CommandReport> reports = (List<CommandReport>) invocation.getArguments()[0];
        for (CommandReport report : reports) {
          String actionId = report.getActionId();
          long[] requestStageIds = StageUtils.getRequestStage(actionId);
          Long requestId = requestStageIds[0];
          Long stageId = requestStageIds[1];
          Long id = report.getTaskId();
          for (Stage stage : stagesInProgress) {
            if (requestId.equals(stage.getRequestId()) && stageId.equals(stage.getStageId())) {
              for (HostRoleCommand hostRoleCommand : stage.getOrderedHostRoleCommands()) {
                if (hostRoleCommand.getTaskId() == id) {
                  hostRoleCommand.setStatus(HostRoleStatus.valueOf(report.getStatus()));
                }
              }
            }
          }

        }

        return null;
      }
    }).when(db).updateHostRoleStates(anyCollectionOf(CommandReport.class));

    when(db.getTask(anyLong())).thenAnswer(new Answer<Object>() {
      @Override
      public Object answer(InvocationOnMock invocation) throws Throwable {
        Long taskId = (Long) invocation.getArguments()[0];
        for (Stage stage : stagesInProgress) {
          for (HostRoleCommand command : stage.getOrderedHostRoleCommands()) {
            if (taskId.equals(command.getTaskId())) {
              return command;
            }
          }
        }
        return null;
      }
    });

    final Map<Long, Boolean> startedRequests = new HashMap<Long, Boolean>();
    doAnswer(new Answer<Void>() {
      @Override
      public Void answer(InvocationOnMock invocation) throws Throwable {
        startedRequests.put((Long)invocation.getArguments()[0], true);
        return null;
      }
    }).when(db).startRequest(anyLong());

    RequestEntity request1 = mock(RequestEntity.class);
    when(request1.isExclusive()).thenReturn(false);
    RequestEntity request2 = mock(RequestEntity.class);
    when(request2.isExclusive()).thenReturn(true);
    RequestEntity request3 = mock(RequestEntity.class);
    when(request3.isExclusive()).thenReturn(false);

    when(db.getRequestEntity(requestId1)).thenReturn(request1);
    when(db.getRequestEntity(requestId2)).thenReturn(request2);
    when(db.getRequestEntity(requestId3)).thenReturn(request3);

    Properties properties = new Properties();
    Configuration conf = new Configuration(properties);

    ActionScheduler scheduler = new ActionScheduler(100, 50, db, aq, fsm, 3,
        new HostsMap((String) null), unitOfWork, null, conf);

    doReturn(STAGE_LAST_ATTEMPT_TIME).when(stageInProgress1).getLastAttemptTime(anyString(), anyString());
    doReturn(STAGE_LAST_ATTEMPT_TIME).when(stageInProgress2).getLastAttemptTime(anyString(), anyString());
    doReturn(STAGE_LAST_ATTEMPT_TIME).when(stageInProgress3).getLastAttemptTime(anyString(), anyString());
    doReturn(STAGE_LAST_ATTEMPT_TIME).when(stageInProgress4).getLastAttemptTime(anyString(), anyString());

After Change


    long requestId2 = 2;
    long requestId3 = 3;

    final List<Stage> stagesInProgress = new ArrayList<Stage>();
    int namenodeCmdTaskId = 1;
    stagesInProgress.add(
            getStageWithSingleTask(
                    hostname1, "cluster1", Role.NAMENODE, RoleCommand.START,
                    Service.Type.HDFS, namenodeCmdTaskId, 1, (int) requestId1));
    stagesInProgress.add(
            getStageWithSingleTask(
                    hostname1, "cluster1", Role.DATANODE, RoleCommand.START,
                    Service.Type.HDFS, 2, 2, (int) requestId1));
    stagesInProgress.add(
            getStageWithSingleTask(
                    hostname2, "cluster1", Role.DATANODE, RoleCommand.STOP, //Exclusive